Sumo LogicのNew Dashboardでフィルターを作ってみる
AWS CloudTrailのログを可視化したいと思っています。マルチアカウントを運用していて、アカウント毎のどのユーザーがAWSコンソールに何回ログインしたかを確認する必要があります。
Sumo LogicのビルトインのAppを少しカスタマイズしてこの要件を満たしていきたいと思います。
CloudTrailのログの可視化ができるダッシュボード(App)
Sumo Logicはログの取り込み設定をした後すぐに分析を始められるよう、240以上(2022年7月時点)のAppと呼ばれる、メジャーなインフラやアプリケーション向けのビルトインのダッシュボードや検索クエリセットが用意されています。
今回はCloudTrailのログに利用できる、「Amazon CloudTrail - Cloud Security Monitoring and Analytics」をインストールして、要件が満たせるか確認していきます。
(S3からCloudTrailのログをSumo Logicへ取り込むための設定については、こちらのブログをご参考ください。)
左メニューのApp Catalogを選択して、Amazon Web Services
のカテゴリを選択する、あるいは検索窓でcloud
とかで検索して、「Amazon CloudTrail - Cloud Security Monitoring and Analytics」をインストールしていきます。
Add Integrationを選択します。
Source Categoryを指定します。※Source Categoryはデータ収集設定の時に任意で指定可能なログのグループのようなものです。本番運用の時はしっかり設計することをお奨めします。Source Categoryの設計についてはこちらを参考にしてください。
Folder Nameは新規の場合は新しく入力するか、既存のフォルダ内に展開したい場合は下部のメニューから選択して、Nextに進みます。
下記の画面になっていれば、Appのインストールは完了です。
左メニューのPersonal FolderにAppが出来ていることを確認します。※反映されていない場合は、画面をリロードしてください。
New Dashboardのフィルターを使って、アカウント別やユーザー別でAWS管理コンソールへのログイン回数を確認していく
インストールしたAppのフォルダを展開して、AWS管理コンソールのログイン状況が確認できるダッシュボード(Amazon CloudTrail - Security Analytics - Login Activity)を開きます。
その中の「Console Login Activity」というセクションがあるのですが、何と、可視化ができていません。
「No Data」と表示されているので、データ自体の取り込みに失敗しているのかなと思ってしまいそうなのですが、他のグラフは表示できているのでデータは取り込めていそうです。他のダッシュボードも全て可視化できていないような場合は、そもそもデータが取り込めていなかったり、Appの対象ではないデータを指定しまっている可能性があります。そちらを確認してください。
また、ダッシュボードの裏では検索クエリが書かれていて、検索文の条件に当てはまらないログがたまたま出ていないためにデータが表示されていなかったり、検索対象の期間中にログが発生していないような場合もあります。
No Dataの原因をTrouble Shootingしてみる
検索クエリを詳しく調べていきましょう。
まず、Console Login Failuresパネルの三点リーダーからOpen in Log Searchを見ていきます。
このパネルを表示するための検索クエリが書かれています。本来、下部に出力が出ていなければならないのですが、表示されていません。
クエリ文を見ていきます。
_sourceCategory=Labs/AWS/CloudTrail ConsoleLogin AwsConsoleSignIn Failure | json "awsRegion", "recipientAccountId" as aws_region, recipient_acc_id nodrop | parse "\"eventSource\":\"*\"" as event_source nodrop | parse "\"eventName\":\"*\"" as event_name nodrop | parse "\"eventType\":\"*\"" as event_Type nodrop | parse "\"awsRegion\":\"*\"" as aws_Region nodrop | parse "\"sourceIPAddress\":\"*\"" as source_ipaddress nodrop | parse "\"userName\":\"*\"" as user nodrop | parse "\"errorMessage\":\"*\"" as errorMessage nodrop | parse "\"errorCode\":\"*\"" as errorCode nodrop | parse "\"principalId\":\"*\"" as principalId nodrop | parse "\"MFAUsed\":\"*\"" as mfaUsed nodrop | parse "\"responseElements\":{\"ConsoleLogin\":\"*\"}" as loginResult nodrop | parse field=errorMessage " Error Code: *; Request ID" as errorCode2 nodrop | parse "\"accountId\":\"*\"" as accountId nodrop | if (isEmpty(errorCode), errorCode2, errorCode) as errorCode | source_ipaddress as src_ip | user as dest_user | where recipient_acc_id matches "*" and aws_region matches "*" | where event_name="ConsoleLogin" and event_type="AwsConsoleSignIn" and loginResult="Failure" | fields src_ip, dest_user, mfaUsed, event_Type, event_name, errorCode, errorMessage, principalId, aws_region, source_ipaddress, accountId | timeslice 15m | count as eventCount by _timeslice | sort _timeslice
Sumo Logicの検索クエリでは、ログの絞り込みは主に「Parse(ログ構造の解析)」に関連するオペレーター(2〜15行目)か「Where(条件検索)」のオペレーター(18〜19行目)によって行われます。
Whereについては、「ある特定のフィールドの値が○○のものを結果に表示する」という風に使えることができ、SQLライクなところもあってピンときやすいのですが、少し注意が必要なのは、Parse系のオペレーターによっても絞り込みが行われるという点です。
このクエリ文だと、json
とparse
がログ構造の解析のためのオペレーターになります。Parse系オペレーターは「1.正規表現やパターンなどでログを構造化して、後のクエリ文でフィールドとして、集計や処理をするために利用する」、「2.正規表現やパターンなどに合致しなかったログを検索結果から除外する」の2つの役割を持っています。
ただし、「nodrop」のオプションがついている場合は、2の機能をスキップして、合致しなかったログについても検索結果に表示させることができます。
ですので、検索結果が表示されていない原因はParse系オペレーターの2の機能によるものではないことが分かりました。
次に先程のクエリの最後のWhere(19行目)について注目してみます。
| where recipient_acc_id matches "*" and aws_region matches "*" | where event_name="ConsoleLogin" and event_type="AwsConsoleSignIn" and loginResult="Failure" | fields src_ip, dest_user, mfaUsed, event_Type, event_name, errorCode, errorMessage, principalId, aws_region, source_ipaddress, accountId
この条件フィルタに合致するログが存在していないのかもしれません。19行目以降を消して、検索してみます。
出力が出ましたので、この文が関連していそうです。
where event_name="ConsoleLogin" and event_type="AwsConsoleSignIn" and loginResult="Failure"
where
を読み解くと、フィールド「event_name」の値が「ConsoleLogin」かつ、「event_type」が「AwsConsoleSignIn」かつ、「loginResult」が「Failure」のレコードに絞り込んでいます。
先程の19行目以降を消した検索結果を確認してみると、event_name、event_typeの値は確かに出ていますが、loginResultの値は空です。この「loginResult」のフィールドの値が「Failure」ではなかったので、where文の条件に合致せずにダッシュボードの表示ができなかったことになります。さて、本当にログの中に「loginResult」のフィールドの値が「Failure」のレコードは無かったのでしょうか。
少し疑ってさらに検索結果を右方向にシフトして確認していくと、元の生ログのメッセージが確認できる箇所があります。そこを確認すると、responseElementsのキーの中に、「ConsoleLogin: "Failure"」という文字が確認できます。
もう一度、クエリ文に戻って「loginResult」のフィールドにパースしている箇所のクエリに注目してみます。
| parse "\"MFAUsed\":\"*\"" as mfaUsed nodrop | parse "\"responseElements\":{\"ConsoleLogin\":\"*\"}" as loginResult nodrop | parse field=errorMessage " Error Code: *; Request ID" as errorCode2 nodrop
するとresponseElementsのキーの中に、「ConsoleLogin: ○○」というパターンをloginResultのフィールドとしてパースしていることが分かります。生のログからは、「ConsoleLogin: "Failure"」となっていたので、本来「loginResult」のフィールドの値は「Failure」となっていそうですが、空白でしたのでパース自体が誤っている可能性があります。
「loginResult」のフィールドを正しく取り出すパースについて考えてみます。いくつか方法はありますが、(jsonオペレーター)を利用していきます。
jsonオペレーターの構文は"json <キー> as <フィールド名>"でパースすることができます。また、jsonの入れ子になっている場合は、<キー名>.<キー名>とすることで内側のパースが可能です。
ですので、以下のようなパースに修正して検索し直してみます。(13行目です)
_sourceCategory=Labs/AWS/CloudTrail ConsoleLogin AwsConsoleSignIn Failure | json "awsRegion", "recipientAccountId" as aws_region, recipient_acc_id nodrop | parse "\"eventSource\":\"*\"" as event_source nodrop | parse "\"eventName\":\"*\"" as event_name nodrop | parse "\"eventType\":\"*\"" as event_Type nodrop | parse "\"awsRegion\":\"*\"" as aws_Region nodrop | parse "\"sourceIPAddress\":\"*\"" as source_ipaddress nodrop | parse "\"userName\":\"*\"" as user nodrop | parse "\"errorMessage\":\"*\"" as errorMessage nodrop | parse "\"errorCode\":\"*\"" as errorCode nodrop | parse "\"principalId\":\"*\"" as principalId nodrop | parse "\"MFAUsed\":\"*\"" as mfaUsed nodrop | json "responseElements.ConsoleLogin" as loginResult nodrop | parse field=errorMessage " Error Code: *; Request ID" as errorCode2 nodrop | parse "\"accountId\":\"*\"" as accountId nodrop | if (isEmpty(errorCode), errorCode2, errorCode) as errorCode | source_ipaddress as src_ip | user as dest_user | where recipient_acc_id matches "*" and aws_region matches "*" | where event_name="ConsoleLogin" and event_type="AwsConsoleSignIn" and loginResult="Failure" | fields src_ip, dest_user, mfaUsed, event_Type, event_name, errorCode, errorMessage, principalId, aws_region, source_ipaddress, accountId | timeslice 15m | count as eventCount by _timeslice | sort _timeslice
すると、いけました。きちんとパースされて、「loginResult」が「Failure」となっている値を見つけることができたようです。条件に合致するログがなかったのではなく、パースが誤っていたために可視化できていなかったことになります。
ちなみに、ダッシュボードを更新したい場合は、ダッシュボード画面のパネルの三点リーダーからEditから開くことができるクエリ文を修正して保存する必要があります。
こちらに先程のクエリをコピペして保存します。
残りの3つのパネルも同じ原因なので、同じ箇所を修正してダッシュボードを更新します。正しくデータが表示されるようになりました。
ダッシュボードのフィルターを使ってみる
ダッシュボードのフィルターについては、ダッシュボード上部に用意されています。アカウントのフィルタリングに関しては、このダッシュボードに既に用意されているので、こちらを使って見たいアカウントにフィルターすることができます。
ところが、ユーザー毎のフィルターということになると残念ながら用意されていませんでした。ですので自分で作っちゃいましょう。
フィルターの作成は、フィルターの右端の+から作成していくことができます。
すると、フィルターを作成するためのエディターが開きます。
設定していく項目について説明していきます。
Variable Name
: フィルターとダッシュボード内のフィルター条件とを結びつけるための変数を定義します。ここで設定した変数名をダッシュボードのクエリ文に条件として含めることでフィルターすることができます。{{変数名}}という記述方法になります。Variable Type
: フィルターに使う変数をどのタイプのクエリの中で利用するかを指定します。どのタイプを選ぶかによって他の設定項目が若干異なります。Query
: Variable TypeでLogs Searchを選択した時に設定できます。フィルターを利用する時のオートコンプリートの候補として選択ができるようにクエリ文を書くことができます。Key
: Variable TypeでLogs Searchを選択した時に設定できます。Queryで出力されたどのキーの値をフィルターの候補とするかを指定することができます。Include the option to select all values (\*)
: フィルターの条件に「*」(全部)を選択可能とするかどうかを決めます。Default Values (optional)
: デフォルト値のフィルター条件を設定します。
では値を埋めていきます。
Variable Name
:{{dest_user}}
Variable Type
:Logs Search
Query
:_sourceCategory=Labs/AWS/CloudTrail | parse "\"userName\":\"*\"" as user | user as dest_user | count by dest_user
Key
:dest_user
Include the option to select all values (\*)
:ON
Default Values (optional)
:\*
Save Template Variableでフィルターを作成します。
すると、新しいフィルターが画面に反映されました。
最後にダッシュボードのクエリ文の中にフィルターの条件となる変数(Variable Name)を埋め込みます。パネルのEditを選択します。
18行目のwhere文がフィルター条件となる部分になります。ここの末尾に新たにユーザーでフィルターするための変数の条件をANDで結んであげます。(and dest_user matches "{{dest_user}}"
)
_sourceCategory=Labs/AWS/CloudTrail ConsoleLogin AwsConsoleSignIn Failure | json "awsRegion", "recipientAccountId" as aws_region, recipient_acc_id nodrop | parse "\"eventSource\":\"*\"" as event_source nodrop | parse "\"eventName\":\"*\"" as event_name nodrop | parse "\"eventType\":\"*\"" as event_Type nodrop | parse "\"awsRegion\":\"*\"" as aws_Region nodrop | parse "\"sourceIPAddress\":\"*\"" as source_ipaddress nodrop | parse "\"userName\":\"*\"" as user nodrop | parse "\"errorMessage\":\"*\"" as errorMessage nodrop | parse "\"errorCode\":\"*\"" as errorCode nodrop | parse "\"principalId\":\"*\"" as principalId nodrop | parse "\"MFAUsed\":\"*\"" as mfaUsed nodrop | json "responseElements.ConsoleLogin" as loginResult nodrop | parse field=errorMessage " Error Code: *; Request ID" as errorCode2 nodrop | parse "\"accountId\":\"*\"" as accountId nodrop | if (isEmpty(errorCode), errorCode2, errorCode) as errorCode | source_ipaddress as src_ip | user as dest_user | where recipient_acc_id matches "{{aws_account}}" and aws_region matches "{{aws_region}}" and dest_user matches "{{dest_user}}" | where event_name="ConsoleLogin" and event_type="AwsConsoleSignIn" and loginResult="Failure" | fields src_ip, dest_user, mfaUsed, event_Type, event_name, errorCode, errorMessage, principalId, aws_region, source_ipaddress, accountId | timeslice 15m | count as eventCount by _timeslice | sort _timeslice
これも同じように他のパネルに適用してダッシュボードを更新します。
するとついに、フィルターを使って、アカウントごと・ユーザーごとにフィルターしてダッシュボードを使うことができるようになりました。
まとめ
このようにダッシュボードを適切なクエリに修正して、追加のフィルター機能を付け加えることができました。New Dashboardのフィルターは非常に使いやすく高機能ですので、色んな場面で活用することが期待できそうです。
ぜひ参考にしていただき、ログ分析によるセキュリティ強化を高めるのにお役立ちください。